package com.atguigu.app

import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.alibaba.fastjson.JSON
import com.atguigu.bean.{CouponAlertInfo, EventLog}
import com.atguigu.constants.GmallConstants
import com.atguigu.util.{MyEsUtil, MyKafkaUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

import scala.util.control.Breaks._

object AlertApp {
  def main(args: Array[String]): Unit = {
    //1.创建Sparkconf
    val sparkConf: SparkConf = new SparkConf().setAppName("AlertApp").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    //3.消费kafka数据
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_EVENT, ssc)

    //4.将数据转成样例类(返回KV类型，k：mid  v：具体数据)
    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH")
    val midToEventLogDStream = kafkaDStream.mapPartitions(partition => {
      partition.map(record => {
        val eventLog: EventLog = JSON.parseObject(record.value(), classOf[EventLog])

        //补全时间字段
        val times: String = sdf.format(new Date(eventLog.ts))
        eventLog.logDate = times.split(" ")(0)
        eventLog.logHour = times.split(" ")(1)

        (eventLog.mid, eventLog)
      })
    })

    //5.开窗
    val midToEventLogWindowDStream = midToEventLogDStream.window(Minutes(5))

    //6.分组聚合（mid）
    val midToIterEventLogDStream: DStream[(String, Iterable[EventLog])] = midToEventLogWindowDStream.groupByKey()

    //7.根据条件筛选数据
    /**
      * 同一设备，（按照mid做聚合，groupBykey） 再聚合
      * 5分钟内（证明统计的数据是五分钟的数据，开一个五分钟的窗口） 先开窗
      * 三次及以上用不同账号登录并领取优惠劵，（判断用户是否有领优惠券行为，然后对领优惠券的用户做去重，利用set（幂等性去重）统计不同的用户数是否大于        等于三，利用set集合中的元素个数来判断）
      * 并且过程中没有浏览商品。（判断用户是否浏览商品，一旦浏览商品，则不符合以上要求，数据就不做处理）
      * 达到以上要求则产生一条预警日志。并且同一设备，每分钟只记录一次预警（幂等性，ES->docId相同的话则会覆盖掉数据，只保留一条，docid=》mid+精确到       分钟的时间）。
      */
    val boolToCouponAlertInfoDStream: DStream[(Boolean, CouponAlertInfo)] = midToIterEventLogDStream.mapPartitions(partition => {
      partition.map { case (mid, iter) =>
        //创建一个Set集合用来保存领优惠券的用户id
        val uids: util.HashSet[String] = new util.HashSet[String]()
        //创建一个set集合用来保存领优惠券所涉及的商品id
        val itemIds: util.HashSet[String] = new util.HashSet[String]()
        //创建用来存放用户涉及的事件
        val events: util.ArrayList[String] = new util.ArrayList[String]()


        //创建一个标志位，用来判断用户是否有浏览商品行为
        var bool: Boolean = true

        //遍历迭代器，获取到每一条数据进行判断
        breakable {
          iter.foreach(log => {
            //添加用户涉及行为
            events.add(log.evid)
            //判断用户是否涉及浏览商品行为
            if ("clickItem".equals(log.evid)) {
              //有浏览商品行为
              bool = false
              break()
            } else if ("coupon".equals(log.evid)) {
              //五分钟内同一设备没有浏览商品但是领优惠券了
              uids.add(log.uid)
              //添加领优惠券涉及商品的id
              itemIds.add(log.itemid)
            }
          })
        }
        //根据去重后的用户个数以及是否有浏览商品行为去重
        //生成疑似预警日志
        (uids.size() >= 3 & bool, CouponAlertInfo(mid, uids, itemIds, events, System.currentTimeMillis()))
      }
    })

    //8.生成预警日志
    val couponAlertInfoDStream: DStream[CouponAlertInfo] = boolToCouponAlertInfoDStream.filter(_._1).map(_._2)

    couponAlertInfoDStream.print()

    //9.将预警日志写入Es
    couponAlertInfoDStream.foreachRDD(rdd=>{
      rdd.foreachPartition(partition=>{
        val list: List[(String, CouponAlertInfo)] = partition.toList.map(log => {
          (log.mid + log.ts / 1000 / 60, log)
        })
        MyEsUtil.insertBulk(GmallConstants.ES_ALERT+"1126",list)
      })
    })

    //10.开启任务
    ssc.start()

    //11.阻塞任务
    ssc.awaitTermination()
  }

}
